创建动作服务端(Python)
在 ROS2 中创建动作服务端可以使节点接收动作客户端发来的任务目标,执行长时间运行的任务,并实时返回进度反馈,最终返回任务的执行结果。本节通过实现经典的斐波那契数列(Fibonacci)动作服务端来展示动作服务端的创建方法。
Fibonacci 动作结构详解
在创建动作服务端和客户端之前,我们明确一下ROS 2中该动作的数据结构:
-
目标请求:
-
order: 10客户端请求动作服务器计算长度为10的斐波那契数列。 -
反馈信息:
partial_sequence: [0, 1, 1, 2, 3, 5]动作服务器执行任务期间,实时向客户端提供进展情况。 -
最终结果:
sequence: [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]动作服务器计算完成后,最终返回给客户端的斐波那契数列结果。
创建功能包
首先,在 ROS2 工作空间中创建一个新的 Python 功能包,并引入动作相关的依赖项:
打开终端,执行:
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python py_action_tutorial --dependencies rclpy action_tutorials_interfaces
在这段创建功能报的命令中使用 --dependencies 添加了如下两个功能包,这样后期就不需要配置 packages.xml 文件中的依赖项了:
rclpy:ROS 2 Python客户端库action_tutorials_interfaces:包含动作定义(这里使用官方提供的Fibonacci动作接口)
动作服务端源代码
在 ~/ros2_ws/src/py_action_tutorial/py_action_tutorial 目录下创建一个名为fibonacci_action_server.py 的Python文件,并粘贴以下代码:
import rclpy
from rclpy.action import ActionServer, GoalResponse, CancelResponse
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback,
goal_callback=self.goal_callback
)
def goal_callback(self,goal_request):
if goal_request.order<0:
self.get_logger().warn("Reject goal:order must>0")
return GoalResponse.REJECT
return GoalResponse.ACCEPT
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1]
for i in range(1, goal_handle.request.order):
feedback_msg.partial_sequence.append(
feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i - 1]
)
self.get_logger().info(f'Feedback: {feedback_msg.partial_sequence}')
goal_handle.publish_feedback(feedback_msg)
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = feedback_msg.partial_sequence
return result
def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
fibonacci_action_server.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
详细代码解释(逐步讲解)
下面将逐步解释上述代码的每个部分:
导入所需的模块
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
rclpy:ROS 2 Python 客户端库,提供ROS系统的基础功能。ActionServer:ROS 2动作服务器类,用于创建动作服务器节点。Node:ROS 2节点的基础类,所有ROS 2节点都从这个类继承。Fibonacci:ROS 2官方提供的动作类型,用于生成斐波那契数列。其结构包含目标、反馈和结果。
创建动作服务器节点类
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback
)
-
定义一个继承自
Node的动作服务器类FibonacciActionServer。 -
在构造函数中,调用父类的构造函数
super().__init__('fibonacci_action_server')来初始化节点名称。 -
使用
ActionServer创建一个动作服务器实例:- 参数依次是当前节点实例(
self)、动作类型(Fibonacci)、动作的名称(fibonacci)、以及接收目标后执行任务的回调函数(execute_callback)。
- 参数依次是当前节点实例(
定义执行动作目标的回调函数
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1]
for i in range(1, goal_handle.request.order):
feedback_msg.partial_sequence.append(
feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i - 1]
)
self.get_logger().info(f'Feedback: {feedback_msg.partial_sequence}')
goal_handle.publish_feedback(feedback_msg)
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = feedback_msg.partial_sequence
return result
-
当客户端发送目标请求(如计算10个斐波那契数)后,此回调函数将自动被调用。
-
goal_handle.request.order:获取客户端请求的目标数据(生成数列的长度)。 -
初始化反馈消息(
feedback_msg),从[0, 1]开始生成斐波那契数列。 -
在循环中,每计算出一个新的斐波那契数,就更新并发布反馈给客户端。
-
完成计算后调用
goal_handle.succeed()标记目标执行成功。 -
创建结果消息(
Fibonacci.Result),并将整个数列返回客户端。
主函数定义
def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
fibonacci_action_server.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
-
rclpy.init(args=args)初始化ROS 2通信系统。 -
创建动作服务器节点实例(
fibonacci_action_server)。 -
rclpy.spin()使节点持续运行,监听并响应客户端发来的动作目标请求。 -
程序退出前调用
destroy_node()显式释放节点资源。 -
rclpy.shutdown()关闭ROS 2通信系统。
创建动作客户端(Python)
上一节我们介绍了如何创建 ROS2 动作服务器节点,本节我们将学习如何创建相应的动作客户端节点,用于向动作服务器发送目标并实时接收反馈和最终结果。
动作客户端完整代码
在功能包目录 ~/ros2_ws/src/py_action_tutorial/py_action_tutorial 中,创建名为 fibonacci_action_client.py 的文件,并加入以下代码:
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
self.get_logger().info('Action server available. Sending goal...')
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected by server.')
return
self.get_logger().info('Goal accepted by server, waiting for result...')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info(f'Result received: {result.sequence}')
rclpy.shutdown()
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f'Received feedback: {feedback.partial_sequence}')
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10) # 请求生成长度为10的斐波那契数列
rclpy.spin(action_client)
if __name__ == '__main__':
main()
详细代码解释(逐步讲解)
接下来我们逐步解释上述客户端代码每个部分的功能:
导入必要模块和接口
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
rclpy是ROS 2提供的Python客户端库。ActionClient是ROS 2动作客户端的基础类,用于与动作服务器通信。Node是所有ROS 2节点的基础类。Fibonacci是ROS 2提供的官方动作接口,定义了目标、反馈和结果的结构。
创建动作客户端节点类
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
-
创建一个名为
FibonacciActionClient的类,继承自ROS 2的基础节点类Node。 -
在构造函数中,调用父类构造函数,指定节点名称为
fibonacci_action_client。 -
创建动作客户端实例:
-
第一个参数是节点实例自身(
self)。 -
第二个参数是动作接口类型(
Fibonacci)。 -
第三个参数是动作服务器名称(
fibonacci),必须与服务端一致。
-
发送动作目标请求的方法(send_goal)
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
self.get_logger().info('Action server available. Sending goal...')
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
self._send_goal_future.add_done_callback(self.goal_response_callback)
-
创建目标消息
goal_msg,设置生成数列的长度为用户指定的参数order。 -
调用
wait_for_server()等待动作服务器上线。 -
使用
send_goal_async()异步发送目标,并提供反馈回调函数feedback_callback,用于实时接收服务器反馈。 -
异步发送目标后返回一个future对象,并注册目标响应回调函数
goal_response_callback。
处理目标响应的回调函数(goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected by server.')
return
self.get_logger().info('Goal accepted by server, waiting for result...')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
-
检查动作服务器是否接受了目标请求。
-
若未接受,则打印日志提示用户。
-
若接受,则调用
get_result_async()异步获取最终结果。 -
注册获取结果后的回调函数
get_result_callback。
接收并处理动作结果(get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info(f'Result received: {result.sequence}')
rclpy.shutdown()
-
当动作服务器完成任务并返回结果时,此函数被调用。
-
打印动作的执行结果(斐波那契数列)。
-
使用
rclpy.shutdown()关闭ROS 2通信系统。
实时接收动作反馈(feedback_callback)
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f'Received feedback: {feedback.partial_sequence}')
-
在动作执行过程中,每次服务器发送反馈消息时,都会调用此函数。
-
打印当前数列生成的进度,让用户了解任务执行的实时状态。
定义客户端节点主函数(main)
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10) # 发送目标请求生成长度为10的数列
rclpy.spin(action_client)
-
初始化 ROS2 通信系统。
-
创建动作客户端节点实例。
-
调用
send_goal(10)向服务器请求生成长度为10的斐波那契数列。 -
使用
rclpy.spin()启动事件循环,监听服务器反馈和结果,直至节点关闭。

图 2: ROS2 动作信息流程
结合图 2 和前面的代码,我们可以更清晰地梳理 ROS2 动作(Action)的通信流程。
在服务端,动作服务器实现了两个关键回调函数:
-
goal_callback():用于处理客户端发送的目标请求,决定是否接受该目标。
-
execute_callback():当目标被接受后,服务器在此函数中执行具体计算,并在过程中周期性发布反馈,最终返回结果。
在客户端,通信过程如下:
-
wait_for_server():客户端首先调用此函数等待动作服务器上线。如果服务器不可用,程序会阻塞在此处。
-
send_goal_async():服务器可用后,客户端通过该异步函数发送目标请求。为了在请求完成后自动处理结果,客户端会使用 add_done_callback() 将 goal_response_callback() 注册为完成回调函数。
-
goal_response_callback():当服务器返回目标受理结果(接受或拒绝)时,该回调会自动被触发。如果目标被接受,回调内部会调用 get_result_async() 异步请求最终计算结果。同时,客户端在目标执行期间会收到服务器通过 feedback_callback() 发布的反馈信息。
-
get_result_callback():当服务器完成执行并返回最终结果时,客户端会自动调用此回调,从而获得动作的最终结果。
构建与运行动作节点
由于在创建功能包的时候已经添加了 --dependencies 的依赖项,因此这里不需要对依赖项进行重新配置,只需要配置 setup.py 文件即可:
编辑 py_action_tutorial/setup.py,在 entry_points 中添加:
entry_points={
'console_scripts': [
'fibonacci_action_server=py_action_tutorial.fibonacci_action_server:main',
'fibonacci_action_client=py_action_tutorial.fibonacci_action_client:main',
],
},
构建功能包:
cd ~/ros2_ws
colcon build --packages-select py_action_tutorial
加载环境变量:
source install/setup.bash
确保动作服务器已启动:
ros2 run py_action_tutorial fibonacci_action_server
再启动客户端节点:
ros2 run py_action_tutorial fibonacci_action_client
客户端终端示例输出:
[INFO] [fibonacci_action_client]: Action server available. Sending goal...
[INFO] [fibonacci_action_client]: Goal accepted by server, waiting for result...
[INFO] [fibonacci_action_client]: Received feedback: [0, 1]
[INFO] [fibonacci_action_client]: Received feedback: [0, 1, 1]
[INFO] [fibonacci_action_client]: Received feedback: [0, 1, 1, 2]
...
[INFO] [fibonacci_action_client]: Result received: [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
小结
本章介绍了 ROS2 中的动作(Action)机制,说明了它在处理 长时间运行、需要实时反馈且可中途取消的任务 时的优势。通过对 .action 文件结构、通信流程以及动作客户端与服务器角色的讲解。
在实践部分,本章通过 Fibonacci 示例展示了如何编写 Python 动作服务器与客户端,并介绍了常用的 ROS2 动作命令,帮助读者在实际项目中调试和使用 Action 接口。
通过本章的学习,读者已经具备在 ROS2 中使用 Action 实现复杂任务控制的能力,为进一步学习导航、机械臂动作等高级功能打下基础。